from contextlib import contextmanager
import logging
import os
import socket
import sys
import uuid
import sentry_sdk
from enum import Enum
import requests
from posthog import Posthog
from typing import List


class Feature(Enum):
    REDTEAMING = "redteaming"
    UNKNOWN = "unknown"


TELEMETRY_DATA_FILE = ".deepteam/telemetry.txt"


#########################################################
### Telemetry Config ####################################
#########################################################


def telemetry_opt_out():
    return os.getenv("DEEPTEAM_TELEMETRY_OPT_OUT", "YES") == "YES"


def blocked_by_firewall():
    try:
        socket.create_connection(("www.google.com", 80))
        return False
    except OSError:
        return True


def get_anonymous_public_ip():
    try:
        response = requests.get("https://api.ipify.org", timeout=5)
        if response.status_code == 200:
            return response.text
    except requests.RequestException:
        pass
    return None


anonymous_public_ip = None

if not telemetry_opt_out():
    from opentelemetry import trace
    from opentelemetry.sdk.trace import TracerProvider
    from opentelemetry.sdk.trace.export import BatchSpanProcessor
    from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import (
        OTLPSpanExporter,
    )

    anonymous_public_ip = get_anonymous_public_ip()
    sentry_sdk.init(
        dsn="",
        profiles_sample_rate=1.0,
        traces_sample_rate=1.0,  # For performance monitoring
        send_default_pii=False,  # Don't send personally identifiable information
        attach_stacktrace=False,  # Don't attach stack traces to messages
        default_integrations=False,  # Disable Sentry's default integrations
    )

    # Set up the Tracer Provider
    if not trace.get_tracer_provider().__class__.__name__ == "TracerProvider":
        trace.set_tracer_provider(TracerProvider())
    tracer_provider = trace.get_tracer_provider()

    # New Relic License Key and OTLP Endpoint
    NEW_RELIC_LICENSE_KEY = "dummy_key"
    NEW_RELIC_OTLP_ENDPOINT = "http://localhost:0"
    otlp_exporter = OTLPSpanExporter(
        endpoint=NEW_RELIC_OTLP_ENDPOINT,
        headers={"api-key": NEW_RELIC_LICENSE_KEY},
    )

    # Add the OTLP exporter to the span processor
    span_processor = BatchSpanProcessor(otlp_exporter)
    tracer_provider.add_span_processor(span_processor)

    logging.getLogger("opentelemetry.exporter.otlp").setLevel(logging.CRITICAL)

    # Create a tracer for your application
    tracer = trace.get_tracer(__name__)

    # Initialize PostHog
    posthog = Posthog(
        project_api_key="dummy_key",
        host="http://localhost:0",  # 无效地址
    )


if (
    os.getenv("ERROR_REPORTING") == "YES"
    and not blocked_by_firewall()
    and not os.getenv("TELEMETRY_OPT_OUT")
):

    def handle_exception(exc_type, exc_value, exc_traceback):
        print({"exc_type": exc_type, "exc_value": exc_value})
        sentry_sdk.capture_exception(exc_value)
        sys.__excepthook__(exc_type, exc_value, exc_traceback)

    sys.excepthook = handle_exception


def is_running_in_jupyter_notebook():
    try:
        from IPython import get_ipython

        if "IPKernelApp" in get_ipython().config:
            return True
    except Exception:
        pass
    return False


IS_RUNNING_IN_JUPYTER = (
    "jupyter" if is_running_in_jupyter_notebook() else "other"
)

#########################################################
### Context Managers ####################################
#########################################################


@contextmanager
def capture_red_teamer_run(vulnerabilities: List[str], attacks: List[str]):
    if not telemetry_opt_out():
        with tracer.start_as_current_span(f"Invoked redteamer") as span:
            posthog.capture(get_unique_id(), f"Invoked redteamer")
            span.set_attribute("environment", IS_RUNNING_IN_JUPYTER)
            span.set_attribute("user.status", get_status())
            span.set_attribute("user.unique_id", get_unique_id())
            span.set_attribute(
                "feature_status.redteaming",
                get_feature_status(Feature.REDTEAMING),
            )
            for vulnerability in vulnerabilities:
                span.set_attribute(f"vulnerability.{vulnerability}", 1)
            for attack in attacks:
                span.set_attribute(f"attack.{attack}", 1)
            if anonymous_public_ip:
                span.set_attribute("user.public_ip", anonymous_public_ip)
            set_last_feature(Feature.REDTEAMING)
            yield span
    else:
        yield


#########################################################
### Helper Functions ####################################
#########################################################


def read_telemetry_file() -> dict:
    """Reads the telemetry data file and returns the key-value pairs as a dictionary."""
    if not os.path.exists(TELEMETRY_DATA_FILE):
        return {}
    with open(TELEMETRY_DATA_FILE, "r") as file:
        lines = file.readlines()
    data = {}
    for line in lines:
        key, _, value = line.strip().partition("=")
        data[key] = value
    return data


def write_telemetry_file(data: dict):
    """Writes the given key-value pairs to the telemetry data file."""
    os.makedirs(os.path.dirname(TELEMETRY_DATA_FILE), exist_ok=True)
    with open(TELEMETRY_DATA_FILE, "w") as file:
        for key, value in data.items():
            file.write(f"{key}={value}\n")


def get_status() -> str:
    """Gets the status from the telemetry file."""
    data = read_telemetry_file()
    return data.get("DEEPTEAM_STATUS", "new")


def get_unique_id() -> str:
    """Gets or generates a unique ID and updates the telemetry file."""
    data = read_telemetry_file()
    unique_id = data.get("DEEPTEAM_ID")
    if not unique_id:
        unique_id = str(uuid.uuid4())
        data["DEEPTEAM_ID"] = unique_id
        data["DEEPTEAM_STATUS"] = "new"
    else:
        data["DEEPTEAM_STATUS"] = "old"
    write_telemetry_file(data)
    return unique_id


def get_last_feature() -> Feature:
    """Gets the last feature from the telemetry file."""
    data = read_telemetry_file()
    last_feature = data.get("DEEPTEAM_LAST_FEATURE")
    if last_feature and last_feature in Feature._value2member_map_:
        return Feature(last_feature)
    return Feature.UNKNOWN


def set_last_feature(feature: Feature):
    """Sets the last feature in the telemetry file."""
    if feature not in Feature:
        raise ValueError(f"Invalid feature: {feature}")
    data = read_telemetry_file()
    data["DEEPTEAM_LAST_FEATURE"] = feature.value
    feature_status_key = f"DEEPTEAM_{feature.value.upper()}_STATUS"
    data[feature_status_key] = "old"
    write_telemetry_file(data)


def get_feature_status(feature: Feature) -> str:
    """Gets the status of a feature ('new' or 'old') from the telemetry file."""
    data = read_telemetry_file()
    feature_status_key = f"DEEPTEAM_{feature.value.upper()}_STATUS"
    return data.get(feature_status_key, "new")
